fix: workflow incorrectly marked as completed while nodes are still executing#83
Conversation
Code Review by Qodo (Alpha)
1. Invalid node IDs queued
|
| @@ -0,0 +1,308 @@ | |||
| """Unit tests for skip propagator.""" | |||
|
|
|||
| from typing import Any | |||
There was a problem hiding this comment.
1. any used in tests 📘 Rule Violation
• The new unit test introduces typing.Any and uses it in annotations for get_incoming_edges_side_effect, weakening static typing and making refactors/type-checking less effective. • This conflicts with the requirement to use strong typing and avoid overly-permissive types unless strictly necessary. • It may also mask incorrect mock usage/signatures that stronger types would catch earlier.
Agent Prompt
## Issue description
The new unit tests introduce `typing.Any` and use it in annotations, which violates the strong-typing guideline and reduces the effectiveness of type checking.
## Issue Context
These helper functions are only used as mock side effects and can be typed precisely (e.g., node IDs are `str`, and the functions return lists of `Edge`-like objects).
## Fix Focus Areas
- api/tests/unit_tests/core/workflow/graph_engine/graph_traversal/test_skip_propagator.py[3-3]
- api/tests/unit_tests/core/workflow/graph_engine/graph_traversal/test_skip_propagator.py[205-223]
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
| with self._lock: | ||
| self._graph.nodes[node_id].state = NodeState.TAKEN | ||
| self._ready_queue.put(node_id) | ||
| self._graph.nodes[node_id].state = NodeState.TAKEN |
There was a problem hiding this comment.
2. Invalid node ids queued 🐞 Bug
• enqueue_node() now pushes node_id into the ready queue before touching self._graph.nodes[node_id]; if the node ID is invalid, a KeyError is raised after the enqueue, leaving a poisoned queue item behind. • Workers dereference self._graph.nodes[node_id] outside the worker’s try: block, so a poisoned queue item can crash the worker thread and stall execution. • Resume flow enqueues paused_nodes without validating the node still exists, increasing the chance of this regression surfacing (e.g., stale snapshots or graph changes between pause/resume).
Agent Prompt
### Issue description
`GraphStateManager.enqueue_node()` currently enqueues `node_id` into the ready queue before verifying that `node_id` exists in `self._graph.nodes`. If the node ID is invalid/stale, the method raises after the enqueue, leaving a bad ID in the queue; workers then crash when dereferencing `graph.nodes[node_id]`.
### Issue Context
This is particularly risky in resume flows: `GraphEngine._start_execution(resume=True)` enqueues `paused_nodes` without validating membership in the current graph, and `GraphRuntimeState` does not validate paused IDs.
### Fix Focus Areas
- api/core/workflow/graph_engine/graph_state_manager.py[42-55]
- api/core/workflow/graph_engine/graph_engine.py[317-340]
- api/core/workflow/graph_engine/worker.py[100-112]
- api/core/workflow/runtime/graph_runtime_state.py[342-352]
### Suggested implementation sketch
- In `enqueue_node()`:
- resolve/validate the node first (`node = self._graph.nodes.get(node_id)`; if None -> raise or no-op depending on desired behavior)
- then `self._ready_queue.put(node_id)`
- then set `node.state = NodeState.TAKEN`
- (Optional hardening) Add a new `schedule_node()` API that performs enqueue + execution-tracking updates atomically and use it consistently across call sites.
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
Benchmark PR from qodo-benchmark#133